Pythonで並行作業を書こうとして、ProcessPoolExecutorではまったお話
サーモン大好き横山です。
python 3.2から入った concurrent.futures を使って並行処理を書いたときのはまったことを書いて行こうと思います。
検証環境
$ sw_vers ProductName: Mac OS X ProductVersion: 10.14.6 BuildVersion: 18G2022 $ python3 -V Python 3.7.5 $ pwd /path/to $ python3 -mvenv venv; . venv/bin/activate
概要
大きいタスクを concurrent.futures.ProcessPoolExecutor
を利用し、大きいタスク内の小さなタスクを concurrent.futures.ThreadPoolExecutor
を使用して、並行作業しようとしました。
処理イメージ
コード
実行結果
$ python main.py num = 0 num = 1 num = 2 concurrent.futures.process._RemoteTraceback: """ Traceback (most recent call last): File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/process.py", line 205, in _sendback_result exception=exception)) File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 358, in put obj = _ForkingPickler.dumps(obj) File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) TypeError: can't pickle _thread.RLock objects """ The above exception was the direct cause of the following exception: Traceback (most recent call last): File "main.py", line 26, in <module> main() File "main.py", line 21, in main for inner_f in outer_f.result(): File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/_base.py", line 428, in result return self.__get_result() File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result raise self._exception TypeError: can't pickle _thread.RLock objects
原因
concurrent.futures.ProcessPoolExecutor
の結果を受け取るときに、 threading.RLock
が pickle
化できず、エラーになります。
concurrent/futures/process.py#L201-L208
def _sendback_result(result_queue, work_id, result=None, exception=None): """Safely send back the given result or exception""" try: result_queue.put(_ResultItem(work_id, result=result, exception=exception)) except BaseException as e: exc = _ExceptionWithTraceback(e, e.__traceback__) result_queue.put(_ResultItem(work_id, exception=exc))
これは、conccurent.fetures.Future
クラスのメンバー変数 self._condition
が持っています。
concurrent/futures/_base.py#L309-L319
class Future(object): """Represents the result of an asynchronous computation.""" def __init__(self): """Initializes the future. Should not be called by clients.""" self._condition = threading.Condition() self._state = PENDING self._result = None self._exception = None self._waiters = [] self._done_callbacks = []
対策
大きい方のタスクの ProcessPoolExecutor
が pickle
化できない 返り値を受け取ろうしているが問題なので、 outer_concurrent
の戻り値を変更する。もしくは、multiprocessにこだわりがなければ、 大きい方のタスクのexecutorをconcurrent.futures.ThreadPoolExecutor
に変更すると解決します。
戻り値を細工する場合
def outer_concurrent(): with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: futures = [executor.submit(inner_concurrent, i) for i in range(3)] return [f.result() for f in futures] # resultの内容だけ返すように修正 def main(): with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor: futures = [executor.submit(outer_concurrent) for _ in range(1)] for outer_f in futures: for inner_f in outer_f.result(): print(f"result = {inner_f}") # resultの内容だけになったので、 「.result()」を削除
大きいタスクのexecutorをThreadPoolExecutorにする場合
def main(): with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: futures = [executor.submit(outer_concurrent) for _ in range(1)] for outer_f in futures: for inner_f in outer_f.result(): print(f"result = {inner_f.result()}")
変更後の実行結果
$ python main.py num = 0 num = 1 num = 2 result = 0 result = 1 result = 4
まとめ
multiprocessing、threadingを意識せず使用できる concurrent.futres
ですが、便利な分エラーが起こったときの調査が意外と大変になりがちです。
そんな人達の助力になれば幸いです。
事業開発部ではソフトウェアエンジニアを募集中です
現在私はクラスメソッドの事業開発部で prismatix というサービスの開発に携わっています。 事業開発部ではソフトウェアエンジニアを募集しています。
もし興味のある方がいましたら、こちらのページ を見ていただけますと幸いです。